Scalaで並行処理#4 – AkkaのFault Tolerance(耐障害性)

Scalaで並行処理#4 – AkkaのFault Tolerance(耐障害性)

Clock Icon2012.01.31

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

Supervisorによる耐障害性

Akkaでのエラーや障害に対する対処は「let it crash」というアプローチをとっています。 この考え方は障害は発生しうるものという前提に基づき、障害が発生したらプロセスを適切に再スタートさせるという考え方です。
そのためにAkkaではSupervisorとよばれるものがActorを管理し、監視対象のActorに障害が発生した際、
プロセスを再スタートさせ、プロセスを生かし続けます。こうしてAkkaは耐障害性を確保しています。

Supervisorが行う再スタート方法

AkkaのSupervisorは二種類の再スタート方法を持っています。
1.One-For-One
管理対象のうちいずれかのプロセスがクラッシュしたら、その対象のコンポーネントのみを再スタートさせます。
2.All-For-One
Supervisorも含む、管理対象のうちいずれかのプロセスがクラッシュしたら、管理しているすべてのコンポーネントを再スタートさせます。

とりあえずサンプルを作成してためしてみましょう。

今回使用した動作環境は以下のとおりです。

  • OS : MacOS X 10.7.2
  • Java : 1.6.0_26
  • Scala : 2.9.1 final
  • SBT : 0.11.2

実行環境のセットアップ

#2の記事で作成したAkkaサンプルを使用します。その記事を参考にプロジェクトを作成しておきましょう。

サンプル実行

AllForOne(1つがエラーになったら全部再スタートする)を試してみましょう。
MyAkka.scalaの中身を下記のようにしてください。

import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.Supervisor
import akka.config.Supervision._

/**
 * FirstErrorActor用のエラーフラグ
 */
object Flag {
 var isError = true
}

/**
 * 1回目はエラーになるアクター
 */
class FirstErrorActor extends Actor {

  
  self.start()

  def receive = { 
    case msg => { 
      println(msg + " by FirstErrorActor")
      if(Flag.isError) throw new Exception("FirstErrorActor Error")
      println("FirstErrorActor receive done.")
    }
  }

  override def preRestart(reason: Throwable) = {
	  println("FirstErrorActor pre Restart.")
	  Flag.isError = false
  }

  override def postRestart(reason: Throwable) = {
	  println("FirstErrorActor post Restart.")
  }
}

/**
 * 通常のアクター
 */
class NomalActor extends Actor {


  self.start()

  def receive = { 
    case msg => { 
      println(msg + " by NomalActor")
    }
  }

  override def preRestart(reason: Throwable) = {
	  println("NomalActor pre Restart.")
  }

  override def postRestart(reason: Throwable) = {
	  println("NomalActor post Restart.")
  }
}


object Main extends App {

  val myAkkaActor1 = actorOf(new FirstErrorActor())
  val myAkkaActor2 = actorOf(new NomalActor())

  val supervisor = Supervisor(
	SupervisorConfig(
      AllForOneStrategy(List(classOf[Exception]), 3, 1000),
      Supervise(myAkkaActor1 , Permanent) :: Supervise(myAkkaActor2 , Permanent) :: Nil
	)
  ).start

  println("-----NomalActor receive-----")
  myAkkaActor2 ! "success!" 

  Thread.sleep(5000)

  println("-----FirstErrorActor receive(Error)-----")
  myAkkaActor1 ! "error!" 

  Thread.sleep(5000)

  println("-----FirstErrorActor receive(Success)-----")
  myAkkaActor1 ! "success!" 

}

まずはアクターの説明です。必要なパッケージをimportしたら、2つのアクターを定義します。
FirstErrorActorは1度目の実行では例外を投げるアクターです。
NomalActorはそのまま問題なく実行可能なアクターです。

/**
 * 1回目はエラーになるアクター
 */
class FirstErrorActor extends Actor {

  
  self.start()

  def receive = { 
    case msg => { 
      println(msg + " by FirstErrorActor")
      if(Flag.isError) throw new Exception("FirstErrorActor Error")
      println("FirstErrorActor receive done.")
    }
  }

  override def preRestart(reason: Throwable) = {
	  println("FirstErrorActor pre Restart.")
	  Flag.isError = false
  }

  override def postRestart(reason: Throwable) = {
	  println("FirstErrorActor post Restart.")
  }
}

/**
 * 通常のアクター
 */
class NomalActor extends Actor {

  self.start()

  def receive = { 
    case msg => { 
      println(msg + " by NomalActor")
    }
  }

  override def preRestart(reason: Throwable) = {
	  println("NomalActor pre Restart.")
  }

  override def postRestart(reason: Throwable) = {
	  println("NomalActor post Restart.")
  }
}

それぞれのアクターがオーバーライドしているpreRestartとpostRestartは、Supervisorによって再スタートする前/後に実行されます。

Mainクラスでは2つのアクターを作成したあと、Supervisorに登録しています。

val supervisor = Supervisor(
      SupervisorConfig(
        AllForOneStrategy(List(classOf[Exception]),//監視対象の例外
        3, //再スタートリトライ回数
        1000),//指定時間
        Supervise(myAkkaActor1 , Permanent) 
        :: Supervise(myAkkaActor2 , Permanent) :: Nil)
  ).start

SupervisorConfigでは第1引数にAllForOneStrategy(1つがエラーになったら全部再起動)を指定します。
AllForOneStrategyの第4引数以降はSuperviseを指定します。
Superviseの第1引数にアクターのインスタンスを指定し、第2引数にPermanent(常に再起動)を指定します。
これをListでつなげてSupervisorに渡せば設定が完了します。

設定が完了したら、NomalActor,FirstErrorActor,FirstErrorActorの順番にメッセージを送ります。

  println("-----NomalActor receive-----")
  myAkkaActor2 ! "success!" 

  Thread.sleep(5000)

  println("-----FirstErrorActor receive(Error)-----")
  myAkkaActor1 ! "error!" 

  Thread.sleep(5000)

  println("-----FirstErrorActor receive(Success)-----")
  myAkkaActor1 ! "success!" 

実行結果下記のようになるはずです。

> run
[info] Running Main 
-----NomalActor receive-----
success! by NomalActor
-----FirstErrorActor receive(Error)-----
error! by FirstErrorActor
NomalActor pre Restart.
NomalActor post Restart.
[ERROR]   [12/01/31 17:59] [akka:event-driven:dispatcher:global-2] [LocalActorRef] FirstErrorActor Error
java.lang.Exception: FirstErrorActor Error
	at FirstErrorActor$$anonfun$receive$1.apply(MyAkka.scala:24)
	at FirstErrorActor$$anonfun$receive$1.apply(MyAkka.scala:21)
	at akka.actor.Actor$class.apply(Actor.scala:545)
	at FirstErrorActor.apply(MyAkka.scala:16)
・・・・・・
・・・・・・
FirstErrorActor pre Restart.
FirstErrorActor post Restart.
-----FirstErrorActor receive(Success)-----
success! by FirstErrorActor
FirstErrorActor receive done.

FirstErrorActorの1回目の実行でエラーが起こった後、NomalActorの再スタートが行われているのがわかります。
その後FirstErrorActorも再スタート処理が行われ、2回目は問題なく処理が終了しています。

まとめ

今回はAkkaのFault Tolerance機能に少しだけふれて見ました。
簡単なサンプルでしたが、この機能を使えばエラー発生時のリカバリ処理が比較的容易に記述できますね。

参考サイトなど

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.